package com.wichell.framework.rocketmq.example.filter;

import java.io.File;
import java.io.IOException;
import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class FilterConsumer {

	public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerFilter");
        consumer.setNamesrvAddr("10.138.61.57:9876;10.138.61.59:9876;");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        File classFile = new File(classLoader.getResource("MessageFilterImpl.java").getFile());

        String filterCode = MixAll.file2String(classFile);
        //filterCode = new String(filterCode.getBytes(), "UTF-8");
        System.out.println(filterCode);
        consumer.subscribe("TopicFilter", "org.apache.rocketmq.example.filter.MessageFilterImpl",
            filterCode);
        //consumer.subscribe("TopicFilter", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            	for(MessageExt msg : msgs){
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msg + "-------"+ new String(msg.getBody()) + "%n");
            	}
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}
